<!doctype html><html lang=zh class=no-js> <head><meta charset=utf-8><meta name=viewport content="width=device-width,initial-scale=1"><link rel="shortcut icon" href=../../assets/images/favicon.png><meta name=generator content="mkdocs-1.1.2, mkdocs-material-5.5.13"><title>SparkSql - Dayet</title><link rel=stylesheet href=../../assets/stylesheets/main.077507d7.min.css><link rel=stylesheet href=../../assets/stylesheets/palette.ff0a5ce4.min.css><meta name=theme-color content=#546d78><link href=https://fonts.gstatic.com rel=preconnect crossorigin><link rel=stylesheet href="https://fonts.googleapis.com/css?family=Roboto:300,400,400i,700%7CConsolas&display=fallback"><style>body,input{font-family:"Roboto",-apple-system,BlinkMacSystemFont,Helvetica,Arial,sans-serif}code,kbd,pre{font-family:"Consolas",SFMono-Regular,Consolas,Menlo,monospace}</style><script>window.ga=window.ga||function(){(ga.q=ga.q||[]).push(arguments)},ga.l=+new Date,ga("create","UA-XXXXXXXX-X","auto"),ga("set","anonymizeIp",!0),ga("send","pageview"),document.addEventListener("DOMContentLoaded",function(){document.forms.search&&document.forms.search.query.addEventListener("blur",function(){if(this.value){var e=document.location.pathname;ga("send","pageview",e+"?q="+this.value)}})}),document.addEventListener("DOMContentSwitch",function(){ga("send","pageview",document.location.pathname)})</script><script async src=https://www.google-analytics.com/analytics.js></script></head> <body dir=ltr data-md-color-scheme data-md-color-primary=blue-grey data-md-color-accent=blue-grey> <input class=md-toggle data-md-toggle=drawer type=checkbox id=__drawer autocomplete=off> <input class=md-toggle data-md-toggle=search type=checkbox id=__search autocomplete=off> <label class=md-overlay for=__drawer></label> <div data-md-component=skip> <a href=#sparksql class=md-skip> 跳转至 </a> </div> <div data-md-component=announce> </div> <header class=md-header data-md-component=header> <nav class="md-header-nav md-grid" aria-label=Header> <a href=../.. title=Dayet class="md-header-nav__button md-logo" aria-label=Dayet> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 24 24"><path d="M12 8a3 3 0 003-3 3 3 0 00-3-3 3 3 0 00-3 3 3 3 0 003 3m0 3.54C9.64 9.35 6.5 8 3 8v11c3.5 0 6.64 1.35 9 3.54 2.36-2.19 5.5-3.54 9-3.54V8c-3.5 0-6.64 1.35-9 3.54z"/></svg> </a> <label class="md-header-nav__button md-icon" for=__drawer> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 24 24"><path d="M3 6h18v2H3V6m0 5h18v2H3v-2m0 5h18v2H3v-2z"/></svg> </label> <div class=md-header-nav__title data-md-component=header-title> <div class=md-header-nav__ellipsis> <span class="md-header-nav__topic md-ellipsis"> Dayet </span> <span class="md-header-nav__topic md-ellipsis"> SparkSql </span> </div> </div> <label class="md-header-nav__button md-icon" for=__search> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 24 24"><path d="M9.5 3A6.5 6.5 0 0116 9.5c0 1.61-.59 3.09-1.56 4.23l.27.27h.79l5 5-1.5 1.5-5-5v-.79l-.27-.27A6.516 6.516 0 019.5 16 6.5 6.5 0 013 9.5 6.5 6.5 0 019.5 3m0 2C7 5 5 7 5 9.5S7 14 9.5 14 14 12 14 9.5 12 5 9.5 5z"/></svg> </label> <div class=md-search data-md-component=search role=dialog> <label class=md-search__overlay for=__search></label> <div class=md-search__inner role=search> <form class=md-search__form name=search> <input type=text class=md-search__input name=query aria-label=搜索 placeholder=搜索 autocapitalize=off autocorrect=off autocomplete=off spellcheck=false data-md-component=search-query data-md-state=active> <label class="md-search__icon md-icon" for=__search> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 24 24"><path d="M9.5 3A6.5 6.5 0 0116 9.5c0 1.61-.59 3.09-1.56 4.23l.27.27h.79l5 5-1.5 1.5-5-5v-.79l-.27-.27A6.516 6.516 0 019.5 16 6.5 6.5 0 013 9.5 6.5 6.5 0 019.5 3m0 2C7 5 5 7 5 9.5S7 14 9.5 14 14 12 14 9.5 12 5 9.5 5z"/></svg> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 24 24"><path d="M20 11v2H8l5.5 5.5-1.42 1.42L4.16 12l7.92-7.92L13.5 5.5 8 11h12z"/></svg> </label> <button type=reset class="md-search__icon md-icon" aria-label=Clear data-md-component=search-reset tabindex=-1> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 24 24"><path d="M19 6.41L17.59 5 12 10.59 6.41 5 5 6.41 10.59 12 5 17.59 6.41 19 12 13.41 17.59 19 19 17.59 13.41 12 19 6.41z"/></svg> </button> </form> <div class=md-search__output> <div class=md-search__scrollwrap data-md-scrollfix> <div class=md-search-result data-md-component=search-result> <div class=md-search-result__meta> Initializing search </div> <ol class=md-search-result__list></ol> </div> </div> </div> </div> </div> <div class=md-header-nav__source> <a href=https://chokgit.gitee.io/blog title="前往 GitHub 仓库" class=md-source> <div class="md-source__icon md-icon"> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 448 512"><path d="M439.55 236.05L244 40.45a28.87 28.87 0 00-40.81 0l-40.66 40.63 51.52 51.52c27.06-9.14 52.68 16.77 43.39 43.68l49.66 49.66c34.23-11.8 61.18 31 35.47 56.69-26.49 26.49-70.21-2.87-56-37.34L240.22 199v121.85c25.3 12.54 22.26 41.85 9.08 55a34.34 34.34 0 01-48.55 0c-17.57-17.6-11.07-46.91 11.25-56v-123c-20.8-8.51-24.6-30.74-18.64-45L142.57 101 8.45 235.14a28.86 28.86 0 000 40.81l195.61 195.6a28.86 28.86 0 0040.8 0l194.69-194.69a28.86 28.86 0 000-40.81z"/></svg> </div> <div class=md-source__repository> 吾生也有崖，而知也无涯。 </div> </a> </div> </nav> </header> <div class=md-container data-md-component=container> <main class=md-main data-md-component=main> <div class="md-main__inner md-grid"> <div class="md-sidebar md-sidebar--primary" data-md-component=navigation> <div class=md-sidebar__scrollwrap> <div class=md-sidebar__inner> <nav class="md-nav md-nav--primary" aria-label=Navigation data-md-level=0> <label class=md-nav__title for=__drawer> <a href=../.. title=Dayet class="md-nav__button md-logo" aria-label=Dayet> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 24 24"><path d="M12 8a3 3 0 003-3 3 3 0 00-3-3 3 3 0 00-3 3 3 3 0 003 3m0 3.54C9.64 9.35 6.5 8 3 8v11c3.5 0 6.64 1.35 9 3.54 2.36-2.19 5.5-3.54 9-3.54V8c-3.5 0-6.64 1.35-9 3.54z"/></svg> </a> Dayet </label> <div class=md-nav__source> <a href=https://chokgit.gitee.io/blog title="前往 GitHub 仓库" class=md-source> <div class="md-source__icon md-icon"> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 448 512"><path d="M439.55 236.05L244 40.45a28.87 28.87 0 00-40.81 0l-40.66 40.63 51.52 51.52c27.06-9.14 52.68 16.77 43.39 43.68l49.66 49.66c34.23-11.8 61.18 31 35.47 56.69-26.49 26.49-70.21-2.87-56-37.34L240.22 199v121.85c25.3 12.54 22.26 41.85 9.08 55a34.34 34.34 0 01-48.55 0c-17.57-17.6-11.07-46.91 11.25-56v-123c-20.8-8.51-24.6-30.74-18.64-45L142.57 101 8.45 235.14a28.86 28.86 0 000 40.81l195.61 195.6a28.86 28.86 0 0040.8 0l194.69-194.69a28.86 28.86 0 000-40.81z"/></svg> </div> <div class=md-source__repository> 吾生也有崖，而知也无涯。 </div> </a> </div> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../.. title=主页 class=md-nav__link> 主页 </a> </li> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-2 type=checkbox id=nav-2> <label class=md-nav__link for=nav-2> 系统基础 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=系统基础 data-md-level=1> <label class=md-nav__title for=nav-2> <span class="md-nav__icon md-icon"></span> 系统基础 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../../%E7%B3%BB%E7%BB%9F/Linux/ title=Linux class=md-nav__link> Linux </a> </li> <li class=md-nav__item> <a href=../../%E7%B3%BB%E7%BB%9F/Shell/ title=Shell class=md-nav__link> Shell </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-3 type=checkbox id=nav-3> <label class=md-nav__link for=nav-3> 编程语言 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=编程语言 data-md-level=1> <label class=md-nav__title for=nav-3> <span class="md-nav__icon md-icon"></span> 编程语言 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../../%E5%BF%85%E5%A4%87%E6%8A%80%E6%9C%AF/JavaSE/ title=Java class=md-nav__link> Java </a> </li> <li class=md-nav__item> <a href=../../%E5%BF%85%E5%A4%87%E6%8A%80%E6%9C%AF/Scala/ title=Scala class=md-nav__link> Scala </a> </li> <li class=md-nav__item> <a href=../../%E5%BF%85%E5%A4%87%E6%8A%80%E6%9C%AF/Python/ title=Python class=md-nav__link> Python </a> </li> <li class=md-nav__item> <a href=../../%E5%BF%85%E5%A4%87%E6%8A%80%E6%9C%AF/Kotlin/ title=Kotlin class=md-nav__link> Kotlin </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-4 type=checkbox id=nav-4> <label class=md-nav__link for=nav-4> 前端技术 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=前端技术 data-md-level=1> <label class=md-nav__title for=nav-4> <span class="md-nav__icon md-icon"></span> 前端技术 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../../%E5%89%8D%E7%AB%AF/Vue/ title=Vue class=md-nav__link> Vue </a> </li> <li class=md-nav__item> <a href=../../%E5%89%8D%E7%AB%AF/Element-UI/ title=Element-UI-admin class=md-nav__link> Element-UI-admin </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-5 type=checkbox id=nav-5> <label class=md-nav__link for=nav-5> 后端技术 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=后端技术 data-md-level=1> <label class=md-nav__title for=nav-5> <span class="md-nav__icon md-icon"></span> 后端技术 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../../%E5%90%8E%E7%AB%AF/SpringBoot/ title=SpringBoot class=md-nav__link> SpringBoot </a> </li> <li class=md-nav__item> <a href=../../%E5%90%8E%E7%AB%AF/Mybatis/ title=Mybatis class=md-nav__link> Mybatis </a> </li> <li class=md-nav__item> <a href=../../%E5%90%8E%E7%AB%AF/SpringData/ title=SpringData class=md-nav__link> SpringData </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-6 type=checkbox id=nav-6> <label class=md-nav__link for=nav-6> 中间件 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=中间件 data-md-level=1> <label class=md-nav__title for=nav-6> <span class="md-nav__icon md-icon"></span> 中间件 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../../%E4%B8%AD%E9%97%B4%E4%BB%B6/Nginx/ title=Nginx class=md-nav__link> Nginx </a> </li> <li class=md-nav__item> <a href=../../%E4%B8%AD%E9%97%B4%E4%BB%B6/Tomcat/ title=Tomcat class=md-nav__link> Tomcat </a> </li> <li class=md-nav__item> <a href=../../%E4%B8%AD%E9%97%B4%E4%BB%B6/RabbitMQ/ title=RabbitMQ class=md-nav__link> RabbitMQ </a> </li> <li class=md-nav__item> <a href=../../%E4%B8%AD%E9%97%B4%E4%BB%B6/RocketMQ/ title=RocketMQ class=md-nav__link> RocketMQ </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--active md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-7 type=checkbox id=nav-7 checked> <label class=md-nav__link for=nav-7> 大数据 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=大数据 data-md-level=1> <label class=md-nav__title for=nav-7> <span class="md-nav__icon md-icon"></span> 大数据 </label> <ul class=md-nav__list data-md-scrollfix> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-7-1 type=checkbox id=nav-7-1> <label class=md-nav__link for=nav-7-1> Hadoop <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=Hadoop data-md-level=2> <label class=md-nav__title for=nav-7-1> <span class="md-nav__icon md-icon"></span> Hadoop </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../Hadoop%E5%9F%BA%E7%A1%80/ title=入门 class=md-nav__link> 入门 </a> </li> <li class=md-nav__item> <a href=../Hadoop-HDFS/ title=HDFS class=md-nav__link> HDFS </a> </li> <li class=md-nav__item> <a href=../Hadoop-MapReduce/ title=MapReduce class=md-nav__link> MapReduce </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--active md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-7-2 type=checkbox id=nav-7-2 checked> <label class=md-nav__link for=nav-7-2> Spark <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=Spark data-md-level=2> <label class=md-nav__title for=nav-7-2> <span class="md-nav__icon md-icon"></span> Spark </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../Spark%E5%9F%BA%E7%A1%80/ title=Spark基础 class=md-nav__link> Spark基础 </a> </li> <li class=md-nav__item> <a href=../SparkCore/ title=SparkCore class=md-nav__link> SparkCore </a> </li> <li class="md-nav__item md-nav__item--active"> <input class="md-nav__toggle md-toggle" data-md-toggle=toc type=checkbox id=__toc> <label class="md-nav__link md-nav__link--active" for=__toc> SparkSql <span class="md-nav__icon md-icon"></span> </label> <a href=./ title=SparkSql class="md-nav__link md-nav__link--active"> SparkSql </a> <nav class="md-nav md-nav--secondary" aria-label=目录> <label class=md-nav__title for=__toc> <span class="md-nav__icon md-icon"></span> 目录 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=#1-spark-sql class=md-nav__link> 第1章 Spark SQL概述 </a> </li> <li class=md-nav__item> <a href=#spark-sql class=md-nav__link> 什么是Spark SQL </a> </li> <li class=md-nav__item> <a href=#spark-sql_1 class=md-nav__link> Spark SQL的特点 </a> </li> <li class=md-nav__item> <a href=#dataframe class=md-nav__link> 什么是DataFrame </a> </li> <li class=md-nav__item> <a href=#dataset class=md-nav__link> 什么是DataSet </a> </li> <li class=md-nav__item> <a href=#2-sparksql class=md-nav__link> 第2章 SparkSQL编程 </a> </li> <li class=md-nav__item> <a href=#21-sparksession class=md-nav__link> 2.1 SparkSession新的起始点 </a> </li> <li class=md-nav__item> <a href=#22-dataframe class=md-nav__link> 2.2 DataFrame </a> <nav class=md-nav aria-label="2.2 DataFrame"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#221 class=md-nav__link> 2.2.1 创建 </a> </li> <li class=md-nav__item> <a href=#222-sql class=md-nav__link> 2.2.2 SQL风格语法(主要) </a> </li> <li class=md-nav__item> <a href=#223-dsl class=md-nav__link> 2.2.3 DSL风格语法(次要) </a> </li> <li class=md-nav__item> <a href=#224-rdddateframe class=md-nav__link> 2.2.4 RDD转换为DateFrame </a> </li> <li class=md-nav__item> <a href=#225-dateframerdd class=md-nav__link> 2.2.5 DateFrame转换为RDD </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#23-dataset class=md-nav__link> 2.3 DataSet </a> <nav class=md-nav aria-label="2.3 DataSet"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#231 class=md-nav__link> 2.3.1 创建 </a> </li> <li class=md-nav__item> <a href=#232-rdddataset class=md-nav__link> 2.3.2 RDD转换为DataSet </a> </li> <li class=md-nav__item> <a href=#233-datasetrdd class=md-nav__link> 2.3.3 DataSet转换为RDD </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#24-dataframedataset class=md-nav__link> 2.4 DataFrame与DataSet的互操作 </a> <nav class=md-nav aria-label="2.4 DataFrame与DataSet的互操作"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#241-datasetdataframe class=md-nav__link> 2.4.1 DataSet转DataFrame </a> </li> <li class=md-nav__item> <a href=#242-dataframedataset class=md-nav__link> 2.4.2 DataFrame转DataSet </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#25-rdddataframedataset class=md-nav__link> 2.5 RDD、DataFrame、DataSet </a> <nav class=md-nav aria-label="2.5 RDD、DataFrame、DataSet"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#251 class=md-nav__link> 2.5.1 三者的共性 </a> </li> <li class=md-nav__item> <a href=#252 class=md-nav__link> 2.5.2 三者的区别 </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#26-ideasparksql class=md-nav__link> 2.6 IDEA创建SparkSQL程序 </a> </li> <li class=md-nav__item> <a href=#27 class=md-nav__link> 2.7 用户自定义函数 </a> <nav class=md-nav aria-label="2.7 用户自定义函数"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#271-udf class=md-nav__link> 2.7.1 用户自定义UDF函数 </a> </li> <li class=md-nav__item> <a href=#272 class=md-nav__link> 2.7.2 用户自定义聚合函数 </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#3-sparksql class=md-nav__link> 第3章 SparkSQL数据源 </a> </li> <li class=md-nav__item> <a href=#31 class=md-nav__link> 3.1 通用加载/保存方法 </a> <nav class=md-nav aria-label="3.1 通用加载/保存方法"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#311 class=md-nav__link> 3.1.1 手动指定选项 </a> </li> <li class=md-nav__item> <a href=#312 class=md-nav__link> 3.1.2 文件保存选项 </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#32-json class=md-nav__link> 3.2 JSON文件 </a> </li> <li class=md-nav__item> <a href=#33-parquet class=md-nav__link> 3.3 Parquet文件 </a> </li> <li class=md-nav__item> <a href=#34-jdbc class=md-nav__link> 3.4 JDBC </a> </li> <li class=md-nav__item> <a href=#35-hive class=md-nav__link> 3.5 Hive数据库 </a> <nav class=md-nav aria-label="3.5 Hive数据库"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#351-hive class=md-nav__link> 3.5.1 内嵌Hive应用 </a> </li> <li class=md-nav__item> <a href=#352-hive class=md-nav__link> 3.5.2 外部Hive应用 </a> </li> <li class=md-nav__item> <a href=#353-spark-sql-cli class=md-nav__link> 3.5.3 运行Spark SQL CLI </a> </li> <li class=md-nav__item> <a href=#354-hive class=md-nav__link> 3.5.4 代码中使用Hive </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#4-spark-sql class=md-nav__link> 第4章 Spark SQL实战 </a> </li> <li class=md-nav__item> <a href=#41 class=md-nav__link> 4.1 数据说明 </a> </li> <li class=md-nav__item> <a href=#42 class=md-nav__link> 4.2 加载数据 </a> </li> <li class=md-nav__item> <a href=#43 class=md-nav__link> 4.3 计算所有订单中每年的销售单数、销售总额 </a> </li> <li class=md-nav__item> <a href=#44 class=md-nav__link> 4.4 计算所有订单每年最大金额订单的销售额 </a> </li> <li class=md-nav__item> <a href=#45 class=md-nav__link> 4.5 计算所有订单中每年最畅销货品 </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=../SparkStreaming/ title=SparkStreaming class=md-nav__link> SparkStreaming </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=../Flink/ title=Flink class=md-nav__link> Flink </a> </li> <li class=md-nav__item> <a href=../Zookeeper/ title=Zookeeper class=md-nav__link> Zookeeper </a> </li> <li class=md-nav__item> <a href=../Hbase/ title=HBase class=md-nav__link> HBase </a> </li> <li class=md-nav__item> <a href=../Hive/ title=Hive class=md-nav__link> Hive </a> </li> <li class=md-nav__item> <a href=../Kafka/ title=Kafka class=md-nav__link> Kafka </a> </li> <li class=md-nav__item> <a href=../Flume/ title=Flume class=md-nav__link> Flume </a> </li> <li class=md-nav__item> <a href=../Sqoop/ title=Sqoop class=md-nav__link> Sqoop </a> </li> <li class=md-nav__item> <a href=../Oozie/ title=Oozie class=md-nav__link> Oozie </a> </li> <li class=md-nav__item> <a href=../Azkaban/ title=Azkaban class=md-nav__link> Azkaban </a> </li> <li class=md-nav__item> <a href=../Kylin/ title=Kylin class=md-nav__link> Kylin </a> </li> <li class=md-nav__item> <a href=../Presto/ title=Presto class=md-nav__link> Presto </a> </li> <li class=md-nav__item> <a href=../ELK/ title=ELK class=md-nav__link> ELK </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-8 type=checkbox id=nav-8> <label class=md-nav__link for=nav-8> 数据库 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=数据库 data-md-level=1> <label class=md-nav__title for=nav-8> <span class="md-nav__icon md-icon"></span> 数据库 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../../%E5%BF%85%E5%A4%87%E6%8A%80%E6%9C%AF/Mysql%E5%AE%89%E8%A3%85/ title=Mysql安装 class=md-nav__link> Mysql安装 </a> </li> <li class=md-nav__item> <a href=../../%E5%BF%85%E5%A4%87%E6%8A%80%E6%9C%AF/Oracle%E5%AE%89%E8%A3%85/ title=Oracle安装 class=md-nav__link> Oracle安装 </a> </li> <li class=md-nav__item> <a href=../../%E5%BF%85%E5%A4%87%E6%8A%80%E6%9C%AF/Redis%E5%AE%89%E8%A3%85/ title=Redis安装 class=md-nav__link> Redis安装 </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-9 type=checkbox id=nav-9> <label class=md-nav__link for=nav-9> 项目开发 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=项目开发 data-md-level=1> <label class=md-nav__title for=nav-9> <span class="md-nav__icon md-icon"></span> 项目开发 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../../%E9%A1%B9%E7%9B%AE%E5%BC%80%E5%8F%91/Web%E5%BC%80%E5%8F%91/ title=Web开发 class=md-nav__link> Web开发 </a> </li> <li class=md-nav__item> <a href=../../%E9%A1%B9%E7%9B%AE%E5%BC%80%E5%8F%91/%E5%AE%89%E5%8D%93%E5%BC%80%E5%8F%91/ title=安卓开发 class=md-nav__link> 安卓开发 </a> </li> <li class=md-nav__item> <a href=../../%E9%A1%B9%E7%9B%AE%E5%BC%80%E5%8F%91/PC%E7%AB%AF%E5%BC%80%E5%8F%91/ title=PC端开发 class=md-nav__link> PC端开发 </a> </li> <li class=md-nav__item> <a href=../../%E9%A1%B9%E7%9B%AE%E5%BC%80%E5%8F%91/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91/ title=大数据开发 class=md-nav__link> 大数据开发 </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-10 type=checkbox id=nav-10> <label class=md-nav__link for=nav-10> 必备技能 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=必备技能 data-md-level=1> <label class=md-nav__title for=nav-10> <span class="md-nav__icon md-icon"></span> 必备技能 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../../%E5%BF%85%E5%A4%87%E6%8A%80%E6%9C%AF/Git/ title=Git class=md-nav__link> Git </a> </li> <li class=md-nav__item> <a href=../../%E5%BF%85%E5%A4%87%E6%8A%80%E6%9C%AF/Zabbix5/ title=Zabbix class=md-nav__link> Zabbix </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-11 type=checkbox id=nav-11> <label class=md-nav__link for=nav-11> 常用工具 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=常用工具 data-md-level=1> <label class=md-nav__title for=nav-11> <span class="md-nav__icon md-icon"></span> 常用工具 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../../%E5%B8%B8%E7%94%A8%E5%B7%A5%E5%85%B7/IDEA/ title=IDEA class=md-nav__link> IDEA </a> </li> <li class=md-nav__item> <a href=../../%E5%B8%B8%E7%94%A8%E5%B7%A5%E5%85%B7/Wiki/ title=Wiki class=md-nav__link> Wiki </a> </li> </ul> </nav> </li> <li class="md-nav__item md-nav__item--nested"> <input class="md-nav__toggle md-toggle" data-md-toggle=nav-12 type=checkbox id=nav-12> <label class=md-nav__link for=nav-12> 面试题库 <span class="md-nav__icon md-icon"></span> </label> <nav class=md-nav aria-label=面试题库 data-md-level=1> <label class=md-nav__title for=nav-12> <span class="md-nav__icon md-icon"></span> 面试题库 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=../../%E5%B8%B8%E7%94%A8%E5%B7%A5%E5%85%B7/IDEA/ title=编程语言 class=md-nav__link> 编程语言 </a> </li> <li class=md-nav__item> <a href=../../%E5%B8%B8%E7%94%A8%E5%B7%A5%E5%85%B7/IDEA/ title=系统操作 class=md-nav__link> 系统操作 </a> </li> <li class=md-nav__item> <a href=../../%E5%B8%B8%E7%94%A8%E5%B7%A5%E5%85%B7/IDEA/ title=数据库 class=md-nav__link> 数据库 </a> </li> <li class=md-nav__item> <a href=../../%E5%B8%B8%E7%94%A8%E5%B7%A5%E5%85%B7/IDEA/ title=大数据 class=md-nav__link> 大数据 </a> </li> <li class=md-nav__item> <a href=../../%E5%B8%B8%E7%94%A8%E5%B7%A5%E5%85%B7/IDEA/ title=项目 class=md-nav__link> 项目 </a> </li> </ul> </nav> </li> </ul> </nav> </div> </div> </div> <div class="md-sidebar md-sidebar--secondary" data-md-component=toc> <div class=md-sidebar__scrollwrap> <div class=md-sidebar__inner> <nav class="md-nav md-nav--secondary" aria-label=目录> <label class=md-nav__title for=__toc> <span class="md-nav__icon md-icon"></span> 目录 </label> <ul class=md-nav__list data-md-scrollfix> <li class=md-nav__item> <a href=#1-spark-sql class=md-nav__link> 第1章 Spark SQL概述 </a> </li> <li class=md-nav__item> <a href=#spark-sql class=md-nav__link> 什么是Spark SQL </a> </li> <li class=md-nav__item> <a href=#spark-sql_1 class=md-nav__link> Spark SQL的特点 </a> </li> <li class=md-nav__item> <a href=#dataframe class=md-nav__link> 什么是DataFrame </a> </li> <li class=md-nav__item> <a href=#dataset class=md-nav__link> 什么是DataSet </a> </li> <li class=md-nav__item> <a href=#2-sparksql class=md-nav__link> 第2章 SparkSQL编程 </a> </li> <li class=md-nav__item> <a href=#21-sparksession class=md-nav__link> 2.1 SparkSession新的起始点 </a> </li> <li class=md-nav__item> <a href=#22-dataframe class=md-nav__link> 2.2 DataFrame </a> <nav class=md-nav aria-label="2.2 DataFrame"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#221 class=md-nav__link> 2.2.1 创建 </a> </li> <li class=md-nav__item> <a href=#222-sql class=md-nav__link> 2.2.2 SQL风格语法(主要) </a> </li> <li class=md-nav__item> <a href=#223-dsl class=md-nav__link> 2.2.3 DSL风格语法(次要) </a> </li> <li class=md-nav__item> <a href=#224-rdddateframe class=md-nav__link> 2.2.4 RDD转换为DateFrame </a> </li> <li class=md-nav__item> <a href=#225-dateframerdd class=md-nav__link> 2.2.5 DateFrame转换为RDD </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#23-dataset class=md-nav__link> 2.3 DataSet </a> <nav class=md-nav aria-label="2.3 DataSet"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#231 class=md-nav__link> 2.3.1 创建 </a> </li> <li class=md-nav__item> <a href=#232-rdddataset class=md-nav__link> 2.3.2 RDD转换为DataSet </a> </li> <li class=md-nav__item> <a href=#233-datasetrdd class=md-nav__link> 2.3.3 DataSet转换为RDD </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#24-dataframedataset class=md-nav__link> 2.4 DataFrame与DataSet的互操作 </a> <nav class=md-nav aria-label="2.4 DataFrame与DataSet的互操作"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#241-datasetdataframe class=md-nav__link> 2.4.1 DataSet转DataFrame </a> </li> <li class=md-nav__item> <a href=#242-dataframedataset class=md-nav__link> 2.4.2 DataFrame转DataSet </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#25-rdddataframedataset class=md-nav__link> 2.5 RDD、DataFrame、DataSet </a> <nav class=md-nav aria-label="2.5 RDD、DataFrame、DataSet"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#251 class=md-nav__link> 2.5.1 三者的共性 </a> </li> <li class=md-nav__item> <a href=#252 class=md-nav__link> 2.5.2 三者的区别 </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#26-ideasparksql class=md-nav__link> 2.6 IDEA创建SparkSQL程序 </a> </li> <li class=md-nav__item> <a href=#27 class=md-nav__link> 2.7 用户自定义函数 </a> <nav class=md-nav aria-label="2.7 用户自定义函数"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#271-udf class=md-nav__link> 2.7.1 用户自定义UDF函数 </a> </li> <li class=md-nav__item> <a href=#272 class=md-nav__link> 2.7.2 用户自定义聚合函数 </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#3-sparksql class=md-nav__link> 第3章 SparkSQL数据源 </a> </li> <li class=md-nav__item> <a href=#31 class=md-nav__link> 3.1 通用加载/保存方法 </a> <nav class=md-nav aria-label="3.1 通用加载/保存方法"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#311 class=md-nav__link> 3.1.1 手动指定选项 </a> </li> <li class=md-nav__item> <a href=#312 class=md-nav__link> 3.1.2 文件保存选项 </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#32-json class=md-nav__link> 3.2 JSON文件 </a> </li> <li class=md-nav__item> <a href=#33-parquet class=md-nav__link> 3.3 Parquet文件 </a> </li> <li class=md-nav__item> <a href=#34-jdbc class=md-nav__link> 3.4 JDBC </a> </li> <li class=md-nav__item> <a href=#35-hive class=md-nav__link> 3.5 Hive数据库 </a> <nav class=md-nav aria-label="3.5 Hive数据库"> <ul class=md-nav__list> <li class=md-nav__item> <a href=#351-hive class=md-nav__link> 3.5.1 内嵌Hive应用 </a> </li> <li class=md-nav__item> <a href=#352-hive class=md-nav__link> 3.5.2 外部Hive应用 </a> </li> <li class=md-nav__item> <a href=#353-spark-sql-cli class=md-nav__link> 3.5.3 运行Spark SQL CLI </a> </li> <li class=md-nav__item> <a href=#354-hive class=md-nav__link> 3.5.4 代码中使用Hive </a> </li> </ul> </nav> </li> <li class=md-nav__item> <a href=#4-spark-sql class=md-nav__link> 第4章 Spark SQL实战 </a> </li> <li class=md-nav__item> <a href=#41 class=md-nav__link> 4.1 数据说明 </a> </li> <li class=md-nav__item> <a href=#42 class=md-nav__link> 4.2 加载数据 </a> </li> <li class=md-nav__item> <a href=#43 class=md-nav__link> 4.3 计算所有订单中每年的销售单数、销售总额 </a> </li> <li class=md-nav__item> <a href=#44 class=md-nav__link> 4.4 计算所有订单每年最大金额订单的销售额 </a> </li> <li class=md-nav__item> <a href=#45 class=md-nav__link> 4.5 计算所有订单中每年最畅销货品 </a> </li> </ul> </nav> </div> </div> </div> <div class=md-content> <article class="md-content__inner md-typeset"> <h1 id=sparksql><center>SparkSQL</center></h1> <h2 id=1-spark-sql>第1章 Spark SQL概述</h2> <h2 id=spark-sql>什么是Spark SQL</h2> <p>Spark SQL是Spark用来处理结构化数据的一个模块，它提供了2个编程抽象：DataFrame和DataSet，并且作为分布式SQL查询引擎的作用。</p> <p>我们已经学习了Hive，它是将Hive SQL转换成MapReduce然后提交到集群上执行，大大简化了编写MapReduc的程序的复杂性，由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生，它是将Spark SQL转换成RDD，然后提交到集群执行，执行效率非常快！</p> <h2 id=spark-sql_1>Spark SQL的特点</h2> <p>1）易整合</p> <p><img alt src=60864a67d4105b1bea008a0f542c962c.png></p> <p>2）统一的数据访问方式</p> <p><img alt=QQ截图20151214163439 src=d2e031ca75e47ae95b2ccd1b9a3dab20.png></p> <p>3）兼容Hive</p> <p><img alt=QQ截图20151214163512 src=86a36a80c9e365b79399c6ec5c91fef4.png></p> <p>4）标准的数据连接</p> <p><img alt=QQ截图20151214163550 src=af865a1427dd8facde4e3bd783aa4006.png></p> <h2 id=dataframe>什么是DataFrame</h2> <p>与RDD类似，DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格，除了数据以外，还记录数据的结构信息，即schema。同时，与Hive类似，DataFrame也支持嵌套数据类型（struct、array和map）。从API易用性的角度上看，DataFrame API提供的是一套高层的关系操作，比函数式的RDD API要更加友好，门槛更低。</p> <p><img alt src=54610f11306f225842198f153310f331.png></p> <blockquote> <p>558274b76d85a_middle</p> </blockquote> <p>上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数，但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息，使得Spark SQL可以清楚地知道该数据集中包含哪些列，每列的名称和类型各是什么。DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待，DataFrame也是懒执行的。性能上比RDD要高，主要原因：</p> <p>优化的执行计划：查询计划通过Spark catalyst optimiser进行优化。</p> <p><img alt=https://SparkSql.licdn.com/mpr/mpr/shrinknp_800_800/AAEAAQAAAAAAAAg-AAAAJGViNWRjMDM3LWJmMzItNGFjMy04OTg4LWQzMTM3YWMwMjhlMA.png src=8ff6a2d95adc72b857fe2e072c739750.png></p> <p>比如下面一个例子：</p> <p><img alt=https://SparkSql.licdn.com/mpr/mpr/shrinknp_800_800/AAEAAQAAAAAAAAglAAAAJDJlOTM5YTZkLWQ1MjMtNDk1OS1iOWU5LTNlMGNmNmJmYjA3Zg.png src=47ef3b0329aca64a27ab1f3c6e31da6f.png></p> <p><img alt src=8e740b43d5f2144d7261108f04a67936.png></p> <blockquote> <p><a href=https://SparkSql.licdn.com/mpr/mpr/shrinknp_800_800/AAEAAQAAAAAAAAlOAAAAJGMxMjBjMmI0LTI4NDgtNGViNS04M2M2LWNjNWE4NjY5MjVmOQ.png>https://SparkSql.licdn.com/mpr/mpr/shrinknp_800_800/AAEAAQAAAAAAAAlOAAAAJGMxMjBjMmI0LTI4NDgtNGViNS04M2M2LWNjNWE4NjY5MjVmOQ.png</a></p> </blockquote> <p>为了说明查询优化，我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame，将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划，最终的执行效率是不高的。因为join是一个代价较大的操作，也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方，先对DataFrame进行过滤，再join过滤后的较小的结果集，便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之，逻辑查询计划优化就是一个利用基于关系代数的等价变换，将高成本的操作替换为低成本操作的过程。</p> <h2 id=dataset>什么是DataSet</h2> <p>1）是Dataframe API的一个扩展，是Spark最新的数据抽象。</p> <p>2）用户友好的API风格，既具有类型安全检查也具有Dataframe的查询优化特性。</p> <p>3）Dataset支持编解码器，当需要访问非堆上的数据时可以避免反序列化整个对象，提高了效率。</p> <p>4）样例类被用来在Dataset中定义数据的结构信息，样例类中每个属性的名称直接映射到DataSet中的字段名称。</p> <p>5） Dataframe是Dataset的特列，DataFrame=Dataset[Row] ，所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型，跟Car、Person这些的类型一样，所有的表结构信息我都用Row来表示。</p> <p>6）DataSet是强类型的。比如可以有Dataset[Car]，Dataset[Person].</p> <p>7）DataFrame只是知道字段，但是不知道字段的类型，所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的，比如你可以对一个String进行减法操作，在执行的时候才报错，而DataSet不仅仅知道字段，而且知道字段类型，所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。</p> <h2 id=2-sparksql>第2章 SparkSQL编程</h2> <h2 id=21-sparksession>2.1 SparkSession新的起始点</h2> <p>在老的版本中，SparkSQL提供两种SQL查询起始点：一个叫SQLContext，用于Spark自己提供的SQL查询；一个叫HiveContext，用于连接Hive的查询。</p> <p>SparkSession是Spark最新的SQL查询起始点，实质上是SQLContext和HiveContext的组合，所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext，所以计算实际上是由sparkContext完成的。</p> <h2 id=22-dataframe>2.2 DataFrame</h2> <h3 id=221>2.2.1 创建</h3> <p>在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口，创建DataFrame有三种方式：通过Spark的数据源进行创建；从一个存在的RDD进行转换；还可以从Hive Table进行查询返回。</p> <p>1）从Spark数据源进行创建</p> <p>（1）查看Spark数据源进行创建的文件格式</p> <p>scala> spark.read.</p> <p>csv format jdbc json load option options orc parquet schema table text textFile</p> <p>（2）读取json文件创建DataFrame</p> <p>scala> val df = spark.read.json(&ldquo;/opt/module/spark/examples/src/main/resources/people.json&rdquo;)</p> <p>df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]</p> <p>（3）展示结果</p> <p>scala> df.show</p> <p>+----+-------+</p> <p>| age| name|</p> <p>+----+-------+</p> <p>|null|Michael|</p> <p>| 30| Andy|</p> <p>| 19| Justin|</p> <p>+----+-------+</p> <p>2）从RDD进行转换</p> <p>2.5节我们专门讨论</p> <p>3）从Hive Table进行查询返回</p> <p>3.5节我们专门讨论</p> <h3 id=222-sql>2.2.2 SQL风格语法(主要)</h3> <p>1）创建一个DataFrame</p> <p>scala> val df = spark.read.json(&ldquo;/opt/module/spark/examples/src/main/resources/people.json&rdquo;)</p> <p>df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]</p> <p>2）对DataFrame创建一个临时表</p> <p>scala> df.createOrReplaceTempView(&ldquo;people&rdquo;)</p> <p>3）通过SQL语句实现查询全表</p> <p>scala> val sqlDF = spark.sql(&ldquo;SELECT * FROM people&rdquo;)</p> <p>sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]</p> <p>4）结果展示</p> <p>scala> sqlDF.show</p> <p>+----+-------+</p> <p>| age| name|</p> <p>+----+-------+</p> <p>|null|Michael|</p> <p>| 30| Andy|</p> <p>| 19| Justin|</p> <p>+----+-------+</p> <p>注意：临时表是Session范围内的，Session退出后，表就失效了。如果想应用范围内有效，可以使用全局表。注意使用全局表时需要全路径访问，如：global_temp.people</p> <p>5）对于DataFrame创建一个全局表</p> <p>scala> df.createGlobalTempView(&ldquo;people&rdquo;)</p> <p>6）通过SQL语句实现查询全表</p> <p>scala> spark.sql(&ldquo;SELECT * FROM global_temp.people&rdquo;).show()</p> <p>+----+-------+</p> <p>| age| name|</p> <p>+----+-------+</p> <p>|null|Michael|</p> <p>| 30| Andy|</p> <p>| 19| Justin|</p> <p>scala> spark.newSession().sql(&ldquo;SELECT * FROM global_temp.people&rdquo;).show()</p> <p>+----+-------+</p> <p>| age| name|</p> <p>+----+-------+</p> <p>|null|Michael|</p> <p>| 30| Andy|</p> <p>| 19| Justin|</p> <p>+----+-------+</p> <h3 id=223-dsl>2.2.3 DSL风格语法(次要)</h3> <p>1）创建一个DateFrame</p> <p>scala> spark.read.</p> <p>csv format jdbc json load option options orc parquet schema table text textFile</p> <p>2）查看DataFrame的Schema信息</p> <p>scala> df.printSchema</p> <p>root</p> <p>|&ndash; age: long (nullable = true)</p> <p>|&ndash; name: string (nullable = true)</p> <p>3）只查看”name”列数据</p> <p>scala> df.select(&ldquo;name&rdquo;).show()</p> <p>+-------+</p> <p>| name|</p> <p>+-------+</p> <p>|Michael|</p> <p>| Andy|</p> <p>| Justin|</p> <p>+-------+</p> <p>4）查看”name”列数据以及”age+1”数据</p> <p>scala> df.select($&ldquo;name&rdquo;, $&ldquo;age&rdquo; + 1).show()</p> <p>+-------+---------+</p> <p>| name|(age + 1)|</p> <p>+-------+---------+</p> <p>|Michael| null|</p> <p>| Andy| 31|</p> <p>| Justin| 20|</p> <p>+-------+---------+</p> <p>5）查看”age”大于”21”的数据</p> <p>scala> df.filter($&ldquo;age&rdquo; > 21).show()</p> <p>+&mdash;+----+</p> <p>|age|name|</p> <p>+&mdash;+----+</p> <p>| 30|Andy|</p> <p>+&mdash;+----+</p> <p>6）按照”age”分组，查看数据条数</p> <p>scala> df.groupBy(&ldquo;age&rdquo;).count().show()</p> <p>+----+-----+</p> <p>| age|count|</p> <p>+----+-----+</p> <p>| 19| 1|</p> <p>|null| 1|</p> <p>| 30| 1|</p> <p>+----+-----+</p> <h3 id=224-rdddateframe>2.2.4 RDD转换为DateFrame</h3> <p>注意：如果需要RDD与DF或者DS之间操作，那么都需要引入 import spark.implicits._ <strong>【spark不是包名，而是sparkSession对象的名称</strong>】</p> <p>前置条件：<strong>导入隐式转换并创建一个RDD</strong></p> <p>scala> import spark.implicits._</p> <p>import spark.implicits._</p> <p>scala> val peopleRDD = sc.textFile(&ldquo;examples/src/main/resources/people.txt&rdquo;)</p> <p>peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at \&lt;console>:27</p> <p>1）通过手动确定转换</p> <p>scala> peopleRDD.map{x=>val para = x.split(&ldquo;,&rdquo;);(para(0),para(1).trim.toInt)}.<strong>toDF</strong>(&ldquo;name&rdquo;,&rdquo;age&rdquo;)</p> <p>res1: org.apache.spark.sql.DataFrame = [name: string, age: int]</p> <p>2）通过反射确定（需要用到样例类）</p> <p>（1）创建一个样例类</p> <p>scala> case class People(name:String, age:Int)</p> <p>（2）根据样例类将RDD转换为DataFrame</p> <p>scala> peopleRDD.map{ x => val para = x.split(&ldquo;,&rdquo;);People(para(0),para(1).trim.toInt)}.toDF</p> <p>res2: org.apache.spark.sql.DataFrame = [name: string, age: int]</p> <p>3）通过编程的方式（了解）</p> <p>（1）导入所需的类型</p> <p>scala> import org.apache.spark.sql.types._</p> <p>import org.apache.spark.sql.types._</p> <p>（2）创建Schema</p> <p>scala> val structType: StructType = StructType(StructField(&ldquo;name&rdquo;, StringType) :: StructField(&ldquo;age&rdquo;, IntegerType) :: Nil)</p> <p>structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))</p> <p>（3）导入所需的类型</p> <p>scala> import org.apache.spark.sql.Row</p> <p>import org.apache.spark.sql.Row</p> <p>（4）根据给定的类型创建二元组RDD</p> <p>scala> val data = peopleRDD.map{ x => val para = x.split(&ldquo;,&rdquo;);Row(para(0),para(1).trim.toInt)}</p> <p>data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at \&lt;console>:33</p> <p>（5）根据数据及给定的schema创建DataFrame</p> <p>scala> val dataFrame = spark.createDataFrame(data, structType)</p> <p>dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]</p> <h3 id=225-dateframerdd>2.2.5 DateFrame转换为RDD</h3> <p>直接调用rdd即可</p> <p>1）创建一个DataFrame</p> <p>scala> val df = spark.read.json(&ldquo;/opt/module/spark/examples/src/main/resources/people.json&rdquo;)</p> <p>df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]</p> <p>2）将DataFrame转换为RDD</p> <p>scala> val dfToRDD = df.rdd</p> <p>dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at \&lt;console>:29</p> <p>3）打印RDD</p> <p>scala> dfToRDD.collect</p> <p>res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])</p> <h2 id=23-dataset>2.3 DataSet</h2> <p>Dataset是具有强类型的数据集合，需要提供对应的类型信息。</p> <h3 id=231>2.3.1 创建</h3> <p>1）创建一个样例类</p> <p>scala> case class Person(name: String, age: Long)</p> <p>defined class Person</p> <p>2）创建DataSet</p> <p>scala> val caseClassDS = Seq(Person(&ldquo;Andy&rdquo;, 32)).toDS()</p> <p>caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]</p> <h3 id=232-rdddataset>2.3.2 RDD转换为DataSet</h3> <p>SparkSQL能够自动将包含有case类的RDD转换成DataFrame，case类定义了table的结构，case类属性通过反射变成了表的列名。</p> <p>1）创建一个RDD</p> <p>scala> val peopleRDD = sc.textFile(&ldquo;examples/src/main/resources/people.txt&rdquo;)</p> <p>peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at \&lt;console>:27</p> <p>2）创建一个样例类</p> <p>scala> case class Person(name: String, age: Long)</p> <p>defined class Person</p> <p>3）将RDD转化为DataSet</p> <p>scala> peopleRDD.map(line => {val para = line.split(&ldquo;,&rdquo;);Person(para(0),para(1).trim.toInt)}).<strong>toDS</strong>()</p> <h3 id=233-datasetrdd>2.3.3 DataSet转换为RDD</h3> <p>调用rdd方法即可。</p> <p>1）创建一个DataSet</p> <p>scala> val DS = Seq(Person(&ldquo;Andy&rdquo;, 32)).toDS()</p> <p>DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]</p> <p>2）将DataSet转换为RDD</p> <p>scala> DS.rdd</p> <p>res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at \&lt;console>:28</p> <h2 id=24-dataframedataset>2.4 DataFrame与DataSet的互操作</h2> <ol> <li>DataFrame转换为DataSet</li> </ol> <p>1）创建一个DateFrame</p> <p>scala> val df = spark.read.json(&ldquo;examples/src/main/resources/people.json&rdquo;)</p> <p>df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]</p> <p>2）创建一个样例类</p> <p>scala> case class Person(name: String, age: Long)</p> <p>defined class Person</p> <p>3）将DateFrame转化为DataSet</p> <p>scala> df.<strong>as</strong>[Person]</p> <p>res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]</p> <ol> <li>DataSet转换为DataFrame</li> </ol> <p>1）创建一个样例类</p> <p>scala> case class Person(name: String, age: Long)</p> <p>defined class Person</p> <p>2）创建DataSet</p> <p>scala> val ds = Seq(Person(&ldquo;Andy&rdquo;, 32)).toDS()</p> <p>ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]</p> <p>3）将DataSet转化为DataFrame</p> <p>scala> val df = ds.toDF</p> <p>df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]</p> <p>4）展示</p> <p>scala> df.show</p> <p>+----+&mdash;+</p> <p>|name|age|</p> <p>+----+&mdash;+</p> <p>|Andy| 32|</p> <p>+----+&mdash;+</p> <h3 id=241-datasetdataframe>2.4.1 DataSet转DataFrame</h3> <p>这个很简单，因为只是把case class封装成Row</p> <p>（1）导入隐式转换</p> <p><strong>import spark.implicits._</strong></p> <p>（2）转换</p> <p>val testDF = testDS.toDF</p> <h3 id=242-dataframedataset>2.4.2 DataFrame转DataSet</h3> <p>（1）导入隐式转换</p> <p>import spark.implicits._</p> <p>（2）创建样例类</p> <p>case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型</p> <p>（3）转换</p> <p>val testDS = testDF.as[Coltest]</p> <p>这种方法就是在给出每一列的类型后，使用as方法，转成Dataset，这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时，一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。</p> <h2 id=25-rdddataframedataset>2.5 RDD、DataFrame、DataSet</h2> <p><img alt src=634ed8781087e6ffaa4d0a24ef7cc676.tiff></p> <p>在SparkSQL中Spark为我们提供了两个新的抽象，分别是DataFrame和DataSet。他们和RDD有什么区别呢？首先从版本的产生上来看：</p> <p>RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)</p> <p>如果同样的数据都给到这三个数据结构，他们分别计算之后，都会给出相同的结果。不同是的他们的执行效率和执行方式。</p> <p>在后期的Spark版本中，DataSet会逐步取代RDD和DataFrame成为唯一的API接口。</p> <h3 id=251>2.5.1 三者的共性</h3> <p>1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集，为处理超大型数据提供便利</p> <p>2、三者都有惰性机制，在进行创建、转换，如map方法时，不会立即执行，只有在遇到Action如foreach时，三者才会开始遍历运算。</p> <p>3、三者都会根据spark的内存情况自动缓存运算，这样即使数据量很大，也不用担心会内存溢出。</p> <p>4、三者都有partition的概念</p> <p>5、三者有许多共同的函数，如filter，排序等</p> <p>6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持</p> <p>import spark.implicits._</p> <p>7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型</p> <p>DataFrame:</p> <p>testDF.map{</p> <p>case Row(col1:String,col2:Int)=></p> <p>println(col1);println(col2)</p> <p>col1</p> <p>case _=></p> <p>&rdquo;&ldquo;</p> <p>}</p> <p>Dataset:</p> <p>case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型</p> <p>testDS.map{</p> <p>case Coltest(col1:String,col2:Int)=></p> <p>println(col1);println(col2)</p> <p>col1</p> <p>case _=></p> <p>&rdquo;&ldquo;</p> <p>}</p> <h3 id=252>2.5.2 三者的区别</h3> <ol> <li>RDD:</li> </ol> <p>1）RDD一般和spark mlib同时使用</p> <p>2）RDD不支持sparksql操作</p> <ol> <li>DataFrame:</li> </ol> <p>1）与RDD和Dataset不同，DataFrame每一行的类型固定为Row，每一列的值没法直接访问，只有通过解析才能获取各个字段的值，如：</p> <p>testDF.foreach{</p> <p>line =></p> <p>val col1=line.getAs<a href title=col1>String</a></p> <p>val col2=line.getAs<a href title=col2>String</a></p> <p>}</p> <p>2）DataFrame与Dataset一般不与spark mlib同时使用</p> <p>3）DataFrame与Dataset均支持sparksql的操作，比如select，groupby之类，还能注册临时表/视窗，进行sql语句操作，如：</p> <p>dataDF.createOrReplaceTempView(&ldquo;tmp&rdquo;)</p> <p>spark.sql(&ldquo;select  ROW,DATE from tmp where DATE is not null order by DATE&rdquo;).show(100,false)</p> <p>4）DataFrame与Dataset支持一些特别方便的保存方式，比如保存成csv，可以带上表头，这样每一列的字段名一目了然</p> <p>//保存</p> <p>val saveoptions = Map(&ldquo;header&rdquo; -> &ldquo;true&rdquo;, &ldquo;delimiter&rdquo; -> &rdquo;\t&rdquo;, &ldquo;path&rdquo; -> &ldquo;hdfs://hadoop102:9000/test&rdquo;)</p> <p>datawDF.write.format(&ldquo;com.atguigu.spark.csv&rdquo;).mode(SaveMode.Overwrite).options(saveoptions).save()</p> <p>//读取</p> <p>val options = Map(&ldquo;header&rdquo; -> &ldquo;true&rdquo;, &ldquo;delimiter&rdquo; -> &rdquo;\t&rdquo;, &ldquo;path&rdquo; -> &ldquo;hdfs://hadoop102:9000/test&rdquo;)</p> <p>val datarDF= spark.read.options(options).format(&ldquo;com.atguigu.spark.csv&rdquo;).load()</p> <p>利用这样的保存方式，可以方便的获得字段名和列的对应，而且分隔符（delimiter）可以自由指定。</p> <ol> <li>Dataset:</li> </ol> <p>1）Dataset和DataFrame拥有完全相同的成员函数，区别只是每一行的数据类型不同。</p> <p>2）DataFrame也可以叫Dataset[Row],每一行的类型是Row，不解析，每一行究竟有哪些字段，各个字段又是什么类型都无从得知，只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中，每一行是什么类型是不一定的，在自定义了case class之后可以很自由的获得每一行的信息</p> <p>case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型</p> <p>/**</p> <p>rdd</p> <p>(&ldquo;a&rdquo;, 1)</p> <p>(&ldquo;b&rdquo;, 1)</p> <p>(&ldquo;a&rdquo;, 1)</p> <p>**/</p> <p>val test: Dataset[Coltest]=rdd.map{line=></p> <p>Coltest(line._1,line._2)</p> <p>}.toDS</p> <p>test.map{</p> <p>line=></p> <p>println(line.col1)</p> <p>println(line.col2)</p> <p>}</p> <p>可以看出，Dataset在需要访问列中的某个字段时是非常方便的，然而，如果要写一些适配性很强的函数时，如果使用Dataset，行的类型又不确定，可能是各种case class，无法实现适配，这时候用DataFrame即Dataset[Row]就能比较好的解决问题</p> <h2 id=26-ideasparksql>2.6 IDEA创建SparkSQL程序</h2> <p>IDEA中程序的打包和运行方式都和SparkCore类似，Maven依赖中需要添加新的依赖项：</p> <p>\&lt;dependency><br> \&lt;groupId>org.apache.spark\&lt;/groupId><br> \&lt;artifactId>spark-sql_2.11\&lt;/artifactId><br> \&lt;version>2.1.1\&lt;/version><br> \&lt;/dependency></p> <p>程序如下：</p> <p>package com.atguigu.sparksql </p> <p>import org.apache.spark.sql.SparkSession<br> import org.apache.spark.{SparkConf, SparkContext}<br> import org.slf4j.LoggerFactory </p> <p>object HelloWorld { </p> <p>def main(args: Array[String]) {<br> //创建SparkConf()并设置App名称<br> val spark = SparkSession<br> .builder()<br> .appName(&ldquo;Spark SQL basic example&rdquo;)<br> .config(&ldquo;spark.some.config.option&rdquo;, &ldquo;some-value&rdquo;)<br> .getOrCreate() </p> <p>// For implicit conversions like converting RDDs to DataFrames<br> <strong>import spark.implicits._</strong> </p> <p>val df = spark.read.json(&ldquo;data/people.json&rdquo;) </p> <p>// Displays the content of the DataFrame to stdout<br> df.show() </p> <p>df.filter($&ldquo;age&rdquo; > 21).show() </p> <p>df.createOrReplaceTempView(&ldquo;persons&rdquo;) </p> <p>spark.sql(&ldquo;SELECT * FROM persons where age > 21&rdquo;).show() </p> <p>spark.stop()<br> } </p> <p>}</p> <h2 id=27>2.7 用户自定义函数</h2> <p>在Shell窗口中可以通过spark.udf功能用户可以自定义函数。</p> <h3 id=271-udf>2.7.1 用户自定义UDF函数</h3> <p>scala> val df = spark.read.json(&ldquo;examples/src/main/resources/people.json&rdquo;)</p> <p>df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]</p> <p>scala> df.show()</p> <p>+----+-------+</p> <p>| age| name|</p> <p>+----+-------+</p> <p>|null|Michael|</p> <p>| 30| Andy|</p> <p>| 19| Justin|</p> <p>+----+-------+</p> <p>scala> spark.udf.register(&ldquo;addName&rdquo;, (x:String)=> &ldquo;Name:&rdquo;+x)</p> <p>res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(\&lt;function1>,StringType,Some(List(StringType)))</p> <p>scala> df.createOrReplaceTempView(&ldquo;people&rdquo;)</p> <p>scala> spark.sql(&ldquo;Select addName(name), age from people&rdquo;).show()</p> <p>+-----------------+----+</p> <p>|UDF:addName(name)| age|</p> <p>+-----------------+----+</p> <p>| Name:Michael|null|</p> <p>| Name:Andy| 30|</p> <p>| Name:Justin| 19|</p> <p>+-----------------+----+</p> <h3 id=272>2.7.2 用户自定义聚合函数</h3> <p>强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数， 如 count()，countDistinct()，avg()，max()，min()。除此之外，用户可以设定自己的自定义聚合函数。</p> <p>弱类型用户自定义聚合函数：通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。下面展示一个求平均工资的自定义聚合函数。</p> <p>import org.apache.spark.sql.expressions.MutableAggregationBuffer<br> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction<br> import org.apache.spark.sql.types._<br> import org.apache.spark.sql.Row<br> import org.apache.spark.sql.SparkSession </p> <p>object MyAverage extends UserDefinedAggregateFunction {<br> // 聚合函数输入参数的数据类型<br> def inputSchema: StructType = StructType(StructField(&ldquo;inputColumn&rdquo;, LongType) :: Nil)<br> // 聚合缓冲区中值得数据类型<br> def bufferSchema: StructType = {<br> StructType(StructField(&ldquo;sum&rdquo;, LongType) :: StructField(&ldquo;count&rdquo;, LongType) :: Nil)<br> }<br> // 返回值的数据类型<br> def dataType: DataType = DoubleType<br> // 对于相同的输入是否一直返回相同的输出。<br> def deterministic: Boolean = true<br> // 初始化<br> def initialize(buffer: MutableAggregationBuffer): Unit = {</p> <p>// 存工资的总额<br> buffer(0) = 0L</p> <p>// 存工资的个数<br> buffer(1) = 0L<br> }<br> // 相同Execute间的数据合并。<br> def update(buffer: MutableAggregationBuffer, input: Row): Unit = {<br> if (!input.isNullAt(0)) {<br> buffer(0) = buffer.getLong(0) + input.getLong(0)<br> buffer(1) = buffer.getLong(1) + 1<br> }<br> }<br> // 不同Execute间的数据合并<br> def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {<br> buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)<br> buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)<br> }<br> // 计算最终结果</p> <p>def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)<br> } </p> <p>// 注册函数<br> spark.udf.register(&ldquo;myAverage&rdquo;, MyAverage) </p> <p>val df = spark.read.json(&ldquo;examples/src/main/resources/employees.json&rdquo;)<br> df.createOrReplaceTempView(&ldquo;employees&rdquo;)<br> df.show()<br> // +-------+------+<br> // | name|salary|<br> // +-------+------+<br> // |Michael| 3000|<br> // | Andy| 4500|<br> // | Justin| 3500|<br> // | Berta| 4000|<br> // +-------+------+ </p> <p>val result = spark.sql(&ldquo;SELECT myAverage(salary) as average_salary FROM employees&rdquo;)<br> result.show()<br> // +--------------+<br> // |average_salary|<br> // +--------------+<br> // | 3750.0|<br> // +--------------+</p> <p>强类型用户自定义聚合函数：通过继承Aggregator来实现强类型自定义聚合函数，同样是求平均工资</p> <p>import org.apache.spark.sql.expressions.Aggregator<br> import org.apache.spark.sql.Encoder<br> import org.apache.spark.sql.Encoders<br> import org.apache.spark.sql.SparkSession<br> // 既然是强类型，可能有case类<br> case class Employee(name: String, salary: Long)<br> case class Average(var sum: Long, var count: Long) </p> <p>object MyAverage extends Aggregator[Employee, Average, Double] {<br> // 定义一个数据结构，保存工资总数和工资总个数，初始都为0<br> def zero: Average = Average(0L, 0L)<br> // Combine two values to produce a new value. For performance, the function may modify `buffer`<br> // and return it instead of constructing a new object<br> def reduce(buffer: Average, employee: Employee): Average = {<br> buffer.sum += employee.salary<br> buffer.count += 1<br> buffer<br> }<br> // 聚合不同execute的结果<br> def merge(b1: Average, b2: Average): Average = {<br> b1.sum += b2.sum<br> b1.count += b2.count<br> b1<br> }<br> // 计算输出<br> def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count<br> // 设定之间值类型的编码器，要转换成case类</p> <p>// Encoders.product是进行scala元组和case类转换的编码器<br> def bufferEncoder: Encoder[Average] = Encoders.product<br> // 设定最终输出值的编码器<br> def outputEncoder: Encoder[Double] = Encoders.scalaDouble<br> }</p> <p>import spark.implicits._</p> <p>val ds = spark.read.json(&ldquo;examples/src/main/resources/employees.json&rdquo;).as[Employee]<br> ds.show()<br> // +-------+------+<br> // | name|salary|<br> // +-------+------+<br> // |Michael| 3000|<br> // | Andy| 4500|<br> // | Justin| 3500|<br> // | Berta| 4000|<br> // +-------+------+ </p> <p>// Convert the function to a `TypedColumn` and give it a name<br> val averageSalary = MyAverage.toColumn.name(&ldquo;average_salary&rdquo;)<br> val result = ds.select(averageSalary)<br> result.show()<br> // +--------------+<br> // |average_salary|<br> // +--------------+<br> // | 3750.0|<br> // +--------------+</p> <h2 id=3-sparksql>第3章 SparkSQL数据源</h2> <h2 id=31>3.1 通用加载/保存方法</h2> <h3 id=311>3.1.1 手动指定选项</h3> <p>Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作，也可以被注册为临时表。把DataFrame注册为临时表之后，就可以对该DataFrame执行SQL查询。</p> <p>Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时，Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default，可修改默认数据源格式。</p> <p>val df = spark.read.load(&ldquo;examples/src/main/resources/users.parquet&rdquo;) df.select(&ldquo;name&rdquo;, &ldquo;favorite_color&rdquo;).write.save(&ldquo;namesAndFavColors.parquet&rdquo;)</p> <p>当数据源格式不是parquet格式文件时，需要手动指定数据源的格式。数据源格式需要指定全名（例如：org.apache.spark.sql.parquet），如果数据源格式为内置格式，则只需要指定简称定json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式。</p> <p>可以通过SparkSession提供的read.load方法用于通用加载数据，使用write和save保存数据。</p> <p>val peopleDF = spark.read.format(&ldquo;json&rdquo;).load(&ldquo;examples/src/main/resources/people.json&rdquo;)<br> peopleDF.write.format(&ldquo;parquet&rdquo;).save(&ldquo;hdfs://hadoop102:9000/namesAndAges.parquet&rdquo;)</p> <p>除此之外，可以直接运行SQL在文件上：</p> <p>val sqlDF = spark.sql(&ldquo;SELECT * FROM parquet.`hdfs://hadoop102:9000/namesAndAges.parquet`&ldquo;)</p> <p>sqlDF.show()</p> <p>scala> val peopleDF = spark.read.format(&ldquo;json&rdquo;).load(&ldquo;examples/src/main/resources/people.json&rdquo;)</p> <p>peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]</p> <p>scala> peopleDF.write.format(&ldquo;parquet&rdquo;).save(&ldquo;hdfs://hadoop102:9000/namesAndAges.parquet&rdquo;)</p> <p>scala> peopleDF.show()</p> <p>+----+-------+</p> <p>| age| name|</p> <p>+----+-------+</p> <p>|null|Michael|</p> <p>| 30| Andy|</p> <p>| 19| Justin|</p> <p>+----+-------+</p> <p>scala> val sqlDF = spark.sql(&ldquo;SELECT * FROM parquet.`hdfs:// hadoop102:9000/namesAndAges.parquet`&ldquo;)</p> <p>17/09/05 04:21:11 WARN ObjectStore: Failed to get database parquet, returning NoSuchObjectException</p> <p>sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]</p> <p>scala> sqlDF.show()</p> <p>+----+-------+</p> <p>| age| name|</p> <p>+----+-------+</p> <p>|null|Michael|</p> <p>| 30| Andy|</p> <p>| 19| Justin|</p> <p>+----+-------+</p> <h3 id=312>3.1.2 文件保存选项</h3> <p>可以采用SaveMode执行存储操作，SaveMode定义了对数据的处理模式。需要注意的是，这些保存模式不使用任何锁定，不是原子操作。此外，当使用Overwrite方式执行时，在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表：</p> <table> <thead> <tr> <th>Scala/Java</th> <th>Any Language</th> <th>Meaning</th> </tr> </thead> <tbody> <tr> <td>SaveMode.ErrorIfExists(default)</td> <td>&ldquo;error&rdquo;(default)</td> <td>如果文件存在，则报错</td> </tr> <tr> <td>SaveMode.Append</td> <td>&ldquo;append&rdquo;</td> <td>追加</td> </tr> <tr> <td>SaveMode.Overwrite</td> <td>&ldquo;overwrite&rdquo;</td> <td>覆写</td> </tr> <tr> <td>SaveMode.Ignore</td> <td>&ldquo;ignore&rdquo;</td> <td>数据存在，则忽略</td> </tr> </tbody> </table> <h2 id=32-json>3.2 JSON文件</h2> <p>Spark SQL 能够自动推测 JSON数据集的结构，并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载一个 一个JSON 文件。</p> <p>注意：这个JSON文件不是一个传统的JSON文件，每一行都得是一个JSON串。</p> <p>{&ldquo;name&rdquo;:&rdquo;Michael&rdquo;}</p> <p>{&ldquo;name&rdquo;:&rdquo;Andy&rdquo;, &ldquo;age&rdquo;:30}</p> <p>{&ldquo;name&rdquo;:&rdquo;Justin&rdquo;, &ldquo;age&rdquo;:19}</p> <p>// Primitive types (Int, String, etc) and Product types (case classes) encoders are<br> // supported by importing this when creating a Dataset.<br> import spark.implicits._ </p> <p>// A JSON dataset is pointed to by path.<br> // The path can be either a single text file or a directory storing text files<br> val path = &ldquo;examples/src/main/resources/people.json&rdquo;<br> val peopleDF = spark.read.json(path) </p> <p>// The inferred schema can be visualized using the printSchema() method<br> peopleDF.printSchema()<br> // root<br> // |&ndash; age: long (nullable = true)<br> // |&ndash; name: string (nullable = true) </p> <p>// Creates a temporary view using the DataFrame<br> peopleDF.createOrReplaceTempView(&ldquo;people&rdquo;) </p> <p>// SQL statements can be run by using the sql methods provided by spark<br> val teenagerNamesDF = spark.sql(&ldquo;SELECT name FROM people WHERE age BETWEEN 13 AND 19&rdquo;)<br> teenagerNamesDF.show()<br> // +------+<br> // | name|<br> // +------+<br> // |Justin|<br> // +------+ </p> <p>// Alternatively, a DataFrame can be created for a JSON dataset represented by<br> // a Dataset[String] storing one JSON object per string<br> val otherPeopleDataset = spark.createDataset(<br> &ldquo;&rdquo;&ldquo;{&ldquo;name&rdquo;:&rdquo;Yin&rdquo;,&rdquo;address&rdquo;:{&ldquo;city&rdquo;:&rdquo;Columbus&rdquo;,&rdquo;state&rdquo;:&rdquo;Ohio&rdquo;}}&rdquo;&ldquo;&rdquo; :: Nil)<br> val otherPeople = spark.read.json(otherPeopleDataset)<br> otherPeople.show()<br> // +---------------+----+<br> // | address|name|<br> // +---------------+----+<br> // |[Columbus,Ohio]| Yin|</p> <h2 id=33-parquet>3.3 Parquet文件</h2> <p>Parquet是一种流行的列式存储格式，可以高效地存储具有嵌套字段的记录。Parquet格式经常在Hadoop生态圈中被使用，它也支持Spark SQL的全部数据类型。Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法。</p> <p>importing spark.implicits._<br> import spark.implicits._ </p> <p>val peopleDF = spark.read.json(&ldquo;examples/src/main/resources/people.json&rdquo;) </p> <p>peopleDF.write.parquet(&ldquo;hdfs://hadoop102:9000/people.parquet&rdquo;) </p> <p>val parquetFileDF = spark.read.parquet(&ldquo;hdfs:// hadoop102:9000/people.parquet&rdquo;) </p> <p>parquetFileDF.createOrReplaceTempView(&ldquo;parquetFile&rdquo;)</p> <p>val namesDF = spark.sql(&ldquo;SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19&rdquo;)<br> namesDF.map(attributes => &ldquo;Name: &rdquo; + attributes(0)).show()<br> // +------------+<br> // | value|<br> // +------------+<br> // |Name: Justin|<br> // +------------+</p> <h2 id=34-jdbc>3.4 JDBC</h2> <p>Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame，通过对DataFrame一系列的计算后，还可以将数据再写回关系型数据库中。</p> <p>注意:<strong>需要将相关的数据库驱动放到spark的类路径下</strong>。</p> <p>（1）启动spark-shell</p> <p>$ bin/spark-shell</p> <p>（2）从Mysql数据库加载数据方式一</p> <p>val jdbcDF = spark.read</p> <p>.format(&ldquo;jdbc&rdquo;)</p> <p>.option(&ldquo;url&rdquo;, &ldquo;jdbc:mysql://hadoop102:3306/rdd&rdquo;)</p> <p>.option(&ldquo;dbtable&rdquo;, &ldquo;rddtable&rdquo;)</p> <p>.option(&ldquo;user&rdquo;, &ldquo;root&rdquo;)</p> <p>.option(&ldquo;password&rdquo;, &ldquo;000000&rdquo;)</p> <p>.load()</p> <p>（3）从Mysql数据库加载数据方式二</p> <p>val connectionProperties = new Properties()<br> connectionProperties.put(&ldquo;user&rdquo;, &ldquo;root&rdquo;)<br> connectionProperties.put(&ldquo;password&rdquo;, &ldquo;000000&rdquo;)<br> val jdbcDF2 = spark.read<br> .jdbc(&ldquo;jdbc:mysql://hadoop102:3306/rdd&rdquo;, &ldquo;rddtable&rdquo;, connectionProperties)</p> <p>（4）将数据写入Mysql方式一</p> <p>jdbcDF.write<br> .format(&ldquo;jdbc&rdquo;)<br> .option(&ldquo;url&rdquo;, &ldquo;jdbc:mysql://hadoop102:3306/rdd&rdquo;)<br> .option(&ldquo;dbtable&rdquo;, &ldquo;dftable&rdquo;)<br> .option(&ldquo;user&rdquo;, &ldquo;root&rdquo;)<br> .option(&ldquo;password&rdquo;, &ldquo;000000&rdquo;)<br> .save()</p> <p>（5）将数据写入Mysql方式二</p> <p>jdbcDF2.write<br> .jdbc(&ldquo;jdbc:mysql://hadoop102:3306/rdd&rdquo;, &ldquo;db&rdquo;, connectionProperties)</p> <h2 id=35-hive>3.5 Hive数据库</h2> <p>Apache Hive是Hadoop上的SQL引擎，Spark SQL编译时可以包含Hive支持，也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是，如果要在Spark SQL中包含Hive的库，并不需要事先安装Hive。一般来说，最好还是在编译Spark SQL时引入Hive支持，这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark，它应该已经在编译时添加了 Hive 支持。</p> <p>若要把Spark SQL连接到一个部署好的Hive上，你必须把hive-site.xml复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好Hive，Spark SQL也可以运行。 需要注意的是，如果你没有部署好Hive，Spark SQL会在当前的工作目录中创建出自己的Hive 元数据仓库，叫作 metastore_db。此外，如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表，这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml，默认的文件系统就是 HDFS，否则就是本地文件系统)。</p> <h3 id=351-hive>3.5.1 内嵌Hive应用</h3> <p>如果要使用内嵌的Hive，什么都不用做，直接用就可以了。</p> <p>可以通过添加参数初次指定数据仓库地址：&ndash;conf spark.sql.warehouse.dir=hdfs://hadoop102/spark-wearhouse</p> <p><img alt src=72c52ff0e49a14120219535b6604a683.png></p> <p>**注意：**如果你使用的是内部的Hive，在Spark2.0之后，spark.sql.warehouse.dir用于指定数据仓库的地址，如果你需要是用HDFS作为路径，那么需要将core-site.xml和hdfs-site.xml 加入到Spark conf目录，否则只会创建master节点上的warehouse目录，查询时会出现文件找不到的问题，这是需要使用HDFS，则需要将metastore删除，重启集群。</p> <h3 id=352-hive>3.5.2 外部Hive应用</h3> <p>如果想连接外部已经部署好的Hive，需要通过以下几个步骤。</p> <ol> <li> <p>将Hive中的hive-site.xml拷贝或者软连接到Spark安装目录下的conf目录下。</p> </li> <li> <p>打开spark shell，注意带上访问Hive元数据库的JDBC客户端</p> </li> </ol> <p>$ bin/spark-shell &ndash;jars mysql-connector-java-5.1.27-bin.jar</p> <h3 id=353-spark-sql-cli>3.5.3 运行Spark SQL CLI</h3> <p>Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI：</p> <p>./bin/spark-sql</p> <h3 id=354-hive>3.5.4 代码中使用Hive</h3> <p>（1）添加依赖：</p> <p>\&lt;!&ndash; <a href=https://mvnrepository.com/artifact/org.apache.spark/spark-hive>https://mvnrepository.com/artifact/org.apache.spark/spark-hive</a> &ndash;></p> <p>\&lt;dependency></p> <p>\&lt;groupId>org.apache.spark\&lt;/groupId></p> <p>\&lt;artifactId>spark-hive_2.11\&lt;/artifactId></p> <p>\&lt;version>2.1.1\&lt;/version></p> <p>\&lt;/dependency></p> <p>\&lt;!&ndash; <a href=https://mvnrepository.com/artifact/org.apache.hive/hive-exec>https://mvnrepository.com/artifact/org.apache.hive/hive-exec</a> &ndash;></p> <p>\&lt;dependency></p> <p>\&lt;groupId>org.apache.hive\&lt;/groupId></p> <p>\&lt;artifactId>hive-exec\&lt;/artifactId></p> <p>\&lt;version>1.2.1\&lt;/version></p> <p>\&lt;/dependency></p> <p>（2）创建SparkSession时需要添加hive支持（红色部分）</p> <p>val warehouseLocation: String = new File(&ldquo;spark-warehouse&rdquo;).getAbsolutePath</p> <p>val spark = SparkSession<br> .builder()<br> .appName(&ldquo;Spark Hive Example&rdquo;)<br> .config(&ldquo;spark.sql.warehouse.dir&rdquo;, warehouseLocation)<br> .enableHiveSupport()<br> .getOrCreate()</p> <p>注意：<strong>蓝色部分为使用内置Hive需要指定一个Hive仓库地址。若使用的是外部Hive，则需要将hive-site.xml添加到ClassPath下。</strong></p> <h2 id=4-spark-sql>第4章 Spark SQL实战</h2> <h2 id=41>4.1 数据说明</h2> <p>数据集是货品交易数据集。</p> <p><img alt=../../../../../../Desktop/New%20Mockup% src=15b3c94e86d85225d5ba3a2cb1c2f08c.png></p> <p>每个订单可能包含多个货品，每个订单可以产生多次交易，不同的货品有不同的单价。</p> <h2 id=42>4.2 加载数据</h2> <p>tbStock：</p> <p>scala> case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable</p> <p>defined class tbStock</p> <p>scala> val tbStockRdd = spark.sparkContext.textFile(&ldquo;tbStock.txt&rdquo;)</p> <p>tbStockRdd: org.apache.spark.rdd.RDD[String] = tbStock.txt MapPartitionsRDD[1] at textFile at \&lt;console>:23</p> <p>scala> val tbStockDS = tbStockRdd.map(_.split(&ldquo;,&rdquo;)).map(attr=>tbStock(attr(0),attr(1),attr(2))).toDS</p> <p>tbStockDS: org.apache.spark.sql.Dataset[tbStock] = [ordernumber: string, locationid: string &hellip; 1 more field]</p> <p>scala> tbStockDS.show()</p> <p>+------------+----------+---------+</p> <p>| ordernumber|locationid| dataid|</p> <p>+------------+----------+---------+</p> <p>|BYSL00000893| ZHAO|2007-8-23|</p> <p>|BYSL00000897| ZHAO|2007-8-24|</p> <p>|BYSL00000898| ZHAO|2007-8-25|</p> <p>|BYSL00000899| ZHAO|2007-8-26|</p> <p>|BYSL00000900| ZHAO|2007-8-26|</p> <p>|BYSL00000901| ZHAO|2007-8-27|</p> <p>|BYSL00000902| ZHAO|2007-8-27|</p> <p>|BYSL00000904| ZHAO|2007-8-28|</p> <p>|BYSL00000905| ZHAO|2007-8-28|</p> <p>|BYSL00000906| ZHAO|2007-8-28|</p> <p>|BYSL00000907| ZHAO|2007-8-29|</p> <p>|BYSL00000908| ZHAO|2007-8-30|</p> <p>|BYSL00000909| ZHAO| 2007-9-1|</p> <p>|BYSL00000910| ZHAO| 2007-9-1|</p> <p>|BYSL00000911| ZHAO|2007-8-31|</p> <p>|BYSL00000912| ZHAO| 2007-9-2|</p> <p>|BYSL00000913| ZHAO| 2007-9-3|</p> <p>|BYSL00000914| ZHAO| 2007-9-3|</p> <p>|BYSL00000915| ZHAO| 2007-9-4|</p> <p>|BYSL00000916| ZHAO| 2007-9-4|</p> <p>+------------+----------+---------+</p> <p>only showing top 20 rows</p> <p>tbStockDetail:</p> <p>scala> case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double) extends Serializable</p> <p>defined class tbStockDetail</p> <p>scala> val tbStockDetailRdd = spark.sparkContext.textFile(&ldquo;tbStockDetail.txt&rdquo;)</p> <p>tbStockDetailRdd: org.apache.spark.rdd.RDD[String] = tbStockDetail.txt MapPartitionsRDD[13] at textFile at \&lt;console>:23</p> <p>scala> val tbStockDetailDS = tbStockDetailRdd.map(_.split(&ldquo;,&rdquo;)).map(attr=> tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble, attr(5).trim().toDouble)).toDS</p> <p>tbStockDetailDS: org.apache.spark.sql.Dataset[tbStockDetail] = [ordernumber: string, rownum: int &hellip; 4 more fields]</p> <p>scala> tbStockDetailDS.show()</p> <p>+------------+------+--------------+------+-----+------+</p> <p>| ordernumber|rownum| itemid|number|price|amount|</p> <p>+------------+------+--------------+------+-----+------+</p> <p>|BYSL00000893| 0|FS527258160501| -1|268.0|-268.0|</p> <p>|BYSL00000893| 1|FS527258169701| 1|268.0| 268.0|</p> <p>|BYSL00000893| 2|FS527230163001| 1|198.0| 198.0|</p> <p>|BYSL00000893| 3|24627209125406| 1|298.0| 298.0|</p> <p>|BYSL00000893| 4|K9527220210202| 1|120.0| 120.0|</p> <p>|BYSL00000893| 5|01527291670102| 1|268.0| 268.0|</p> <p>|BYSL00000893| 6|QY527271800242| 1|158.0| 158.0|</p> <p>|BYSL00000893| 7|ST040000010000| 8| 0.0| 0.0|</p> <p>|BYSL00000897| 0|04527200711305| 1|198.0| 198.0|</p> <p>|BYSL00000897| 1|MY627234650201| 1|120.0| 120.0|</p> <p>|BYSL00000897| 2|01227111791001| 1|249.0| 249.0|</p> <p>|BYSL00000897| 3|MY627234610402| 1|120.0| 120.0|</p> <p>|BYSL00000897| 4|01527282681202| 1|268.0| 268.0|</p> <p>|BYSL00000897| 5|84126182820102| 1|158.0| 158.0|</p> <p>|BYSL00000897| 6|K9127105010402| 1|239.0| 239.0|</p> <p>|BYSL00000897| 7|QY127175210405| 1|199.0| 199.0|</p> <p>|BYSL00000897| 8|24127151630206| 1|299.0| 299.0|</p> <p>|BYSL00000897| 9|G1126101350002| 1|158.0| 158.0|</p> <p>|BYSL00000897| 10|FS527258160501| 1|198.0| 198.0|</p> <p>|BYSL00000897| 11|ST040000010000| 13| 0.0| 0.0|</p> <p>+------------+------+--------------+------+-----+------+</p> <p>only showing top 20 rows</p> <p>tbDate:</p> <p>scala> case class tbDate(dateid:String, years:Int, theyear:Int, month:Int, day:Int, weekday:Int, week:Int, quarter:Int, period:Int, halfmonth:Int) extends Serializable</p> <p>defined class tbDate</p> <p>scala> val tbDateRdd = spark.sparkContext.textFile(&ldquo;tbDate.txt&rdquo;)</p> <p>tbDateRdd: org.apache.spark.rdd.RDD[String] = tbDate.txt MapPartitionsRDD[20] at textFile at \&lt;console>:23</p> <p>scala> val tbDateDS = tbDateRdd.map(_.split(&ldquo;,&rdquo;)).map(attr=> tbDate(attr(0),attr(1).trim().toInt, attr(2).trim().toInt,attr(3).trim().toInt, attr(4).trim().toInt, attr(5).trim().toInt, attr(6).trim().toInt, attr(7).trim().toInt, attr(8).trim().toInt, attr(9).trim().toInt)).toDS</p> <p>tbDateDS: org.apache.spark.sql.Dataset[tbDate] = [dateid: string, years: int &hellip; 8 more fields]</p> <p>scala> tbDateDS.show()</p> <p>+---------+------+-------+-----+&mdash;+-------+----+-------+------+---------+</p> <p>| dateid| years|theyear|month|day|weekday|week|quarter|period|halfmonth|</p> <p>+---------+------+-------+-----+&mdash;+-------+----+-------+------+---------+</p> <p>| 2003-1-1|200301| 2003| 1| 1| 3| 1| 1| 1| 1|</p> <p>| 2003-1-2|200301| 2003| 1| 2| 4| 1| 1| 1| 1|</p> <p>| 2003-1-3|200301| 2003| 1| 3| 5| 1| 1| 1| 1|</p> <p>| 2003-1-4|200301| 2003| 1| 4| 6| 1| 1| 1| 1|</p> <p>| 2003-1-5|200301| 2003| 1| 5| 7| 1| 1| 1| 1|</p> <p>| 2003-1-6|200301| 2003| 1| 6| 1| 2| 1| 1| 1|</p> <p>| 2003-1-7|200301| 2003| 1| 7| 2| 2| 1| 1| 1|</p> <p>| 2003-1-8|200301| 2003| 1| 8| 3| 2| 1| 1| 1|</p> <p>| 2003-1-9|200301| 2003| 1| 9| 4| 2| 1| 1| 1|</p> <p>|2003-1-10|200301| 2003| 1| 10| 5| 2| 1| 1| 1|</p> <p>|2003-1-11|200301| 2003| 1| 11| 6| 2| 1| 2| 1|</p> <p>|2003-1-12|200301| 2003| 1| 12| 7| 2| 1| 2| 1|</p> <p>|2003-1-13|200301| 2003| 1| 13| 1| 3| 1| 2| 1|</p> <p>|2003-1-14|200301| 2003| 1| 14| 2| 3| 1| 2| 1|</p> <p>|2003-1-15|200301| 2003| 1| 15| 3| 3| 1| 2| 1|</p> <p>|2003-1-16|200301| 2003| 1| 16| 4| 3| 1| 2| 2|</p> <p>|2003-1-17|200301| 2003| 1| 17| 5| 3| 1| 2| 2|</p> <p>|2003-1-18|200301| 2003| 1| 18| 6| 3| 1| 2| 2|</p> <p>|2003-1-19|200301| 2003| 1| 19| 7| 3| 1| 2| 2|</p> <p>|2003-1-20|200301| 2003| 1| 20| 1| 4| 1| 2| 2|</p> <p>+---------+------+-------+-----+&mdash;+-------+----+-------+------+---------+</p> <p>only showing top 20 rows</p> <p>注册表：</p> <p>scala> tbStockDS.createOrReplaceTempView(&ldquo;tbStock&rdquo;)</p> <p>scala> tbDateDS.createOrReplaceTempView(&ldquo;tbDate&rdquo;)</p> <p>scala> tbStockDetailDS.createOrReplaceTempView(&ldquo;tbStockDetail&rdquo;)</p> <h2 id=43>4.3 计算所有订单中每年的销售单数、销售总额</h2> <p>统计所有订单中每年的销售单数、销售总额</p> <p>三个表连接后以count(distinct a.ordernumber)计销售单数，sum(b.amount)计销售总额</p> <p><img alt=../../../../../../Desktop/New%20Mockup% src=15b3c94e86d85225d5ba3a2cb1c2f08c.png></p> <p>SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount)</p> <p>FROM tbStock a</p> <p>JOIN tbStockDetail b ON a.ordernumber = b.ordernumber</p> <p>JOIN tbDate c ON a.dateid = c.dateid</p> <p>GROUP BY c.theyear</p> <p>ORDER BY c.theyear</p> <p>spark.sql(&ldquo;SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear ORDER BY c.theyear&rdquo;).show</p> <p>结果如下：</p> <p>+-------+---------------------------+--------------------+</p> <p>|theyear|count(DISTINCT ordernumber)| sum(amount)|</p> <p>+-------+---------------------------+--------------------+</p> <p>| 2004| 1094| 3268115.499199999|</p> <p>| 2005| 3828|1.3257564149999991E7|</p> <p>| 2006| 3772|1.3680982900000006E7|</p> <p>| 2007| 4885|1.6719354559999993E7|</p> <p>| 2008| 4861| 1.467429530000001E7|</p> <p>| 2009| 2619| 6323697.189999999|</p> <p>| 2010| 94| 210949.65999999997|</p> <p>+-------+---------------------------+--------------------+</p> <h2 id=44>4.4 计算所有订单每年最大金额订单的销售额</h2> <p>目标：统计每年最大金额订单的销售额:</p> <p><img alt=../../../../../../Desktop/New%20Mockup% src=15b3c94e86d85225d5ba3a2cb1c2f08c.png></p> <ol> <li>统计每年，每个订单一共有多少销售额</li> </ol> <p>SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount</p> <p>FROM tbStock a</p> <p>JOIN tbStockDetail b ON a.ordernumber = b.ordernumber</p> <p>GROUP BY a.dateid, a.ordernumber</p> <p>spark.sql(&ldquo;SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber&rdquo;).show</p> <p>结果如下：</p> <p>+----------+------------+------------------+</p> <p>| dateid| ordernumber| SumOfAmount|</p> <p>+----------+------------+------------------+</p> <p>| 2008-4-9|BYSL00001175| 350.0|</p> <p>| 2008-5-12|BYSL00001214| 592.0|</p> <p>| 2008-7-29|BYSL00011545| 2064.0|</p> <p>| 2008-9-5|DGSL00012056| 1782.0|</p> <p>| 2008-12-1|DGSL00013189| 318.0|</p> <p>|2008-12-18|DGSL00013374| 963.0|</p> <p>| 2009-8-9|DGSL00015223| 4655.0|</p> <p>| 2009-10-5|DGSL00015585| 3445.0|</p> <p>| 2010-1-14|DGSL00016374| 2934.0|</p> <p>| 2006-9-24|GCSL00000673|3556.1000000000004|</p> <p>| 2007-1-26|GCSL00000826| 9375.199999999999|</p> <p>| 2007-5-24|GCSL00001020| 6171.300000000002|</p> <p>| 2008-1-8|GCSL00001217| 7601.6|</p> <p>| 2008-9-16|GCSL00012204| 2018.0|</p> <p>| 2006-7-27|GHSL00000603| 2835.6|</p> <p>|2006-11-15|GHSL00000741| 3951.94|</p> <p>| 2007-6-6|GHSL00001149| 0.0|</p> <p>| 2008-4-18|GHSL00001631| 12.0|</p> <p>| 2008-7-15|GHSL00011367| 578.0|</p> <p>| 2009-5-8|GHSL00014637| 1797.6|</p> <p>+----------+------------+------------------+</p> <ol> <li>以上一步查询结果为基础表，和表tbDate使用dateid join，求出每年最大金额订单的销售额</li> </ol> <p>SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount</p> <p>FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount</p> <p>FROM tbStock a</p> <p>JOIN tbStockDetail b ON a.ordernumber = b.ordernumber</p> <p>GROUP BY a.dateid, a.ordernumber</p> <p>) c</p> <p>JOIN tbDate d ON c.dateid = d.dateid</p> <p>GROUP BY theyear</p> <p>ORDER BY theyear DESC</p> <p>spark.sql(&ldquo;SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber ) c JOIN tbDate d ON c.dateid = d.dateid GROUP BY theyear ORDER BY theyear DESC&rdquo;).show</p> <p>结果如下：</p> <p>+-------+------------------+</p> <p>|theyear| SumOfAmount|</p> <p>+-------+------------------+</p> <p>| 2010|13065.280000000002|</p> <p>| 2009|25813.200000000008|</p> <p>| 2008| 55828.0|</p> <p>| 2007| 159126.0|</p> <p>| 2006| 36124.0|</p> <p>| 2005|38186.399999999994|</p> <p>| 2004| 23656.79999999997|</p> <p>+-------+------------------+</p> <h2 id=45>4.5 计算所有订单中每年最畅销货品</h2> <p>目标：统计每年最畅销货品（哪个货品销售额amount在当年最高，哪个就是最畅销货品）</p> <p><img alt=../../../../../../Desktop/New%20Mockup% src=15b3c94e86d85225d5ba3a2cb1c2f08c.png></p> <p>第一步、求出每年每个货品的销售额</p> <p>SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount</p> <p>FROM tbStock a</p> <p>JOIN tbStockDetail b ON a.ordernumber = b.ordernumber</p> <p>JOIN tbDate c ON a.dateid = c.dateid</p> <p>GROUP BY c.theyear, b.itemid</p> <p>spark.sql(&ldquo;SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid&rdquo;).show</p> <p>结果如下：</p> <p>+-------+--------------+------------------+</p> <p>|theyear| itemid| SumOfAmount|</p> <p>+-------+--------------+------------------+</p> <p>| 2004|43824480810202| 4474.72|</p> <p>| 2006|YA214325360101| 556.0|</p> <p>| 2006|BT624202120102| 360.0|</p> <p>| 2007|AK215371910101|24603.639999999992|</p> <p>| 2008|AK216169120201|29144.199999999997|</p> <p>| 2008|YL526228310106|16073.099999999999|</p> <p>| 2009|KM529221590106| 5124.800000000001|</p> <p>| 2004|HT224181030201|2898.6000000000004|</p> <p>| 2004|SG224308320206| 7307.06|</p> <p>| 2007|04426485470201|14468.800000000001|</p> <p>| 2007|84326389100102| 9134.11|</p> <p>| 2007|B4426438020201| 19884.2|</p> <p>| 2008|YL427437320101|12331.799999999997|</p> <p>| 2008|MH215303070101| 8827.0|</p> <p>| 2009|YL629228280106| 12698.4|</p> <p>| 2009|BL529298020602| 2415.8|</p> <p>| 2009|F5127363019006| 614.0|</p> <p>| 2005|24425428180101| 34890.74|</p> <p>| 2007|YA214127270101| 240.0|</p> <p>| 2007|MY127134830105| 11099.92|</p> <p>+-------+--------------+------------------+</p> <p>第二步、在第一步的基础上，统计每年单个货品中的最大金额</p> <p>SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount</p> <p>FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount</p> <p>FROM tbStock a</p> <p>JOIN tbStockDetail b ON a.ordernumber = b.ordernumber</p> <p>JOIN tbDate c ON a.dateid = c.dateid</p> <p>GROUP BY c.theyear, b.itemid</p> <p>) d</p> <p>GROUP BY d.theyear</p> <p>spark.sql(&ldquo;SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear&rdquo;).show</p> <p>结果如下：</p> <p>+-------+------------------+</p> <p>|theyear| MaxOfAmount|</p> <p>+-------+------------------+</p> <p>| 2007| 70225.1|</p> <p>| 2006| 113720.6|</p> <p>| 2004|53401.759999999995|</p> <p>| 2009| 30029.2|</p> <p>| 2005|56627.329999999994|</p> <p>| 2010| 4494.0|</p> <p>| 2008| 98003.60000000003|</p> <p>+-------+------------------+</p> <p>第三步、用最大销售额和统计好的每个货品的销售额join，以及用年join，集合得到最畅销货品那一行信息</p> <p>SELECT DISTINCT e.theyear, e.itemid, f.MaxOfAmount</p> <p>FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount</p> <p>FROM tbStock a</p> <p>JOIN tbStockDetail b ON a.ordernumber = b.ordernumber</p> <p>JOIN tbDate c ON a.dateid = c.dateid</p> <p>GROUP BY c.theyear, b.itemid</p> <p>) e</p> <p>JOIN (SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount</p> <p>FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount</p> <p>FROM tbStock a</p> <p>JOIN tbStockDetail b ON a.ordernumber = b.ordernumber</p> <p>JOIN tbDate c ON a.dateid = c.dateid</p> <p>GROUP BY c.theyear, b.itemid</p> <p>) d</p> <p>GROUP BY d.theyear</p> <p>) f ON e.theyear = f.theyear</p> <p>AND e.SumOfAmount = f.MaxOfAmount</p> <p>ORDER BY e.theyear</p> <p>spark.sql(&ldquo;SELECT DISTINCT e.theyear, e.itemid, f.maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) e JOIN (SELECT d.theyear, MAX(d.sumofamount) AS maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear ) f ON e.theyear = f.theyear AND e.sumofamount = f.maxofamount ORDER BY e.theyear&rdquo;).show</p> <p>结果如下：</p> <p>+-------+--------------+------------------+</p> <p>|theyear| itemid| maxofamount|</p> <p>+-------+--------------+------------------+</p> <p>| 2004|JY424420810101|53401.759999999995|</p> <p>| 2005|24124118880102|56627.329999999994|</p> <p>| 2006|JY425468460101| 113720.6|</p> <p>| 2007|JY425468460101| 70225.1|</p> <p>| 2008|E2628204040101| 98003.60000000003|</p> <p>| 2009|YL327439080102| 30029.2|</p> <p>| 2010|SQ429425090101| 4494.0|</p> <p>+-------+--------------+------------------+</p> </article> </div> </div> </main> <footer class=md-footer> <div class=md-footer-nav> <nav class="md-footer-nav__inner md-grid" aria-label=Footer> <a href=../SparkCore/ title=SparkCore class="md-footer-nav__link md-footer-nav__link--prev" rel=prev> <div class="md-footer-nav__button md-icon"> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 24 24"><path d="M20 11v2H8l5.5 5.5-1.42 1.42L4.16 12l7.92-7.92L13.5 5.5 8 11h12z"/></svg> </div> <div class=md-footer-nav__title> <div class=md-ellipsis> <span class=md-footer-nav__direction> 上一页 </span> SparkCore </div> </div> </a> <a href=../SparkStreaming/ title=SparkStreaming class="md-footer-nav__link md-footer-nav__link--next" rel=next> <div class=md-footer-nav__title> <div class=md-ellipsis> <span class=md-footer-nav__direction> 下一页 </span> SparkStreaming </div> </div> <div class="md-footer-nav__button md-icon"> <svg xmlns=http://www.w3.org/2000/svg viewbox="0 0 24 24"><path d="M4 11v2h12l-5.5 5.5 1.42 1.42L19.84 12l-7.92-7.92L10.5 5.5 16 11H4z"/></svg> </div> </a> </nav> </div> <div class="md-footer-meta md-typeset"> <div class="md-footer-meta__inner md-grid"> <div class=md-footer-copyright> <div class=md-footer-copyright__highlight> Copyright &copy; 2018 - 2029 Dayet 296577630@qq.com </div> Made with <a href=https://squidfunk.github.io/mkdocs-material/ target=_blank rel=noopener> Material for MkDocs </a> </div> </div> </div> </footer> </div> <script src=../../assets/javascripts/vendor.2d1db4bd.min.js></script> <script src=../../assets/javascripts/bundle.6627ddf3.min.js></script><script id=__lang type=application/json>{"clipboard.copy": "\u590d\u5236", "clipboard.copied": "\u5df2\u590d\u5236", "search.config.lang": "ja", "search.config.pipeline": "trimmer, stemmer", "search.config.separator": "[\\uff0c\\u3002]+", "search.result.placeholder": "\u952e\u5165\u4ee5\u5f00\u59cb\u641c\u7d22", "search.result.none": "\u6ca1\u6709\u627e\u5230\u7b26\u5408\u6761\u4ef6\u7684\u7ed3\u679c", "search.result.one": "\u627e\u5230 1 \u4e2a\u7b26\u5408\u6761\u4ef6\u7684\u7ed3\u679c", "search.result.other": "# \u4e2a\u7b26\u5408\u6761\u4ef6\u7684\u7ed3\u679c"}</script> <script>
        app = initialize({
          base: "../..",
          features: [],
          search: Object.assign({
            worker: "../../assets/javascripts/worker/search.5eca75d3.min.js"
          }, typeof search !== "undefined" && search)
        })
      </script> </body> </html>